import asyncio

import pandas as pd
import re
import httpx
import time
from diskcache.core import args_to_key
from shares.utils import auto_change_column_dtypes

from shares.cache import cache


class Query(object):
    def __init__(self, method, fields, params, return_column_type_dict, return_column_name_dict, token):
        self.method = method
        self.fields = fields
        self.params = params
        self.return_column_type_dict = return_column_type_dict
        self.return_column_name_dict = return_column_name_dict
        self.token = token

    def change_type_and_column_names(self, df, cn_column_names=False, change_types=False):
        if not cn_column_names and not change_types:
            return df
        else:
            common_columns = list(
                set(self.return_column_name_dict.keys()).intersection(set(df.columns.values)))
            if change_types:
                df = auto_change_column_dtypes(df, self.return_column_type_dict)
            if cn_column_names:
                common_columns_name_dict = {col: self.return_column_name_dict[col].split("，")[0] for col in common_columns}
                df.rename(columns=common_columns_name_dict, inplace=True)
            return df

    def clean_cache(self):
        key = args_to_key(base=(self.method,), args=(), kwargs=self.params, typed=False, ignore=set())
        with cache:
            cache.delete(key)

    def cache_or_get(self, cn_column_names=False, change_types=True):
        params = {"from": self.method, "key": self.token, 'fields': self.fields, **self.params}
        key = args_to_key(base=(self.method,), args=(cn_column_names, change_types), kwargs=self.params, typed=False,
                          ignore=set())
        df = None
        with cache:
            cached = cache.get(key)
        if cached is not None:
            df = cached
        if df is None:
            df = self.__call_api(df, params)
        if df is not None:
            df = self.change_type_and_column_names(df, cn_column_names, change_types=change_types)
            with cache:
                cache.set(key, df, tag="api")
        return df

    @staticmethod
    async def __async_call_api(df, params):
        for i in range(5):
            async with httpx.AsyncClient() as client:
                response = await client.get(
                     "https://www.mushuju.com/data?key=" + params['key'] + "&from=" + params['from'] + "?" + "&".join(
                    [str(pair) + "=" + str(params[pair]) for pair in params.keys() if pair not in ['key', 'from']]),
                    timeout=60)
            rsp = response.json()
            if rsp['code'] == 200:
                data = rsp['data']
                df = pd.DataFrame(data)
                break
            else:
                print(response.text)
        return df

    @staticmethod
    def __call_api(df, params):
        """
        https://www.mushuju.com/data?key=您的key&from=getStockHKDayKLine?code=00001,00002&ktype=101&startDate=2022-08-16&endDate=2100-01-01&fields=code,name,ktype
        """
        for i in range(5):
            response = httpx.get(
                "https://www.mushuju.com/data?key=" + params['key'] + "&from=" + params['from'] + "?" + "&".join(
                    [str(pair) + "=" + str(params[pair]) for pair in params.keys() if pair not in ['key', 'from']]),
                timeout=60)
            try:
                rsp = response.json()
                if rsp['code'] == 200:
                    data = rsp['data']
                    df = pd.DataFrame(data)
                    break
                else:
                    print(rsp['message'])
            except Exception as e:
                print(e)

        return df

    def get(self, cn_column_names=False, change_types=True):
        params = {"from": self.method, "key": self.token, 'fields': self.fields, **self.params}
        df = None
        df = self.__call_api(df, params)
        if df is not None:
            df = self.change_type_and_column_names(df, cn_column_names, change_types=change_types)
        return df

    async def async_cache_or_get(self, cn_column_names=False, change_types=True):
        params = {"from": self.method, "key": self.token, 'fields': self.fields, **self.params}
        key = args_to_key(base=(self.method,), args=(cn_column_names, change_types), kwargs=self.params, typed=False,
                          ignore=set())
        df = None
        with cache:
            cached = cache.get(key)
        if cached is not None:
            df = cached
        if df is None:
            df = await self.__async_call_api(df, params)
        if df is not None:
            df = self.change_type_and_column_names(df, cn_column_names, change_types=change_types)
            with cache:
                cache.set(key, df, tag='api')
        return df

    async def async_get(self, cn_column_names=False, change_types=True):
        params = {"from": self.method, "key": self.token, 'fields': self.fields, **self.params}
        df = None
        df = await self.__async_call_api(df, params)
        if df is not None:
            df = self.change_type_and_column_names(df, cn_column_names, change_types=change_types)
        return df


__all__ = ['Query']
